/*
 *  Copyright 1999-2019 Seata.io Group.
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */
package io.seata.server.storage.file.store;

import com.alipay.sofa.jraft.util.BufferUtils;
import io.seata.common.exception.StoreException;
import io.seata.common.thread.NamedThreadFactory;
import io.seata.common.util.CollectionUtils;
import io.seata.server.session.BranchSession;
import io.seata.server.session.GlobalSession;
import io.seata.server.session.SessionCondition;
import io.seata.server.session.SessionManager;
import io.seata.server.storage.file.FlushDiskMode;
import io.seata.server.storage.file.ReloadableStore;
import io.seata.server.storage.file.TransactionWriteStore;
import io.seata.server.store.AbstractTransactionStoreManager;
import io.seata.server.store.SessionStorable;
import io.seata.server.store.StoreConfig;
import io.seata.server.store.TransactionStoreManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;

import static io.seata.core.context.RootContext.MDC_KEY_BRANCH_ID;

/**
 * The type File transaction store manager.
 *
 * @author slievrly
 */
public class FileTransactionStoreManager extends AbstractTransactionStoreManager
		implements TransactionStoreManager, ReloadableStore {

	private static final Logger LOGGER = LoggerFactory.getLogger(FileTransactionStoreManager.class);

	private static final int MAX_THREAD_WRITE = 1;

	private ExecutorService fileWriteExecutor;

	private volatile boolean stopping = false;

	private static final int MAX_SHUTDOWN_RETRY = 3;

	private static final int SHUTDOWN_CHECK_INTERVAL = 1 * 1000;

	private static final int MAX_WRITE_RETRY = 5;

	private static final String HIS_DATA_FILENAME_POSTFIX = ".1";

	private static final AtomicLong FILE_TRX_NUM = new AtomicLong(0);

	private static final AtomicLong FILE_FLUSH_NUM = new AtomicLong(0);

	private static final int MARK_SIZE = 4;

	private static final int MAX_WAIT_TIME_MILLS = 2 * 1000;

	private static final int MAX_FLUSH_TIME_MILLS = 2 * 1000;

	private static final int MAX_FLUSH_NUM = 10;

	private static final int PER_FILE_BLOCK_SIZE = 65535 * 8;

	private static final long MAX_TRX_TIMEOUT_MILLS = 30 * 60 * 1000;

	private static volatile long trxStartTimeMills = System.currentTimeMillis();

	private File currDataFile;

	private RandomAccessFile currRaf;

	private FileChannel currFileChannel;

	private long recoverCurrOffset = 0;

	private long recoverHisOffset = 0;

	private SessionManager sessionManager;

	private String currFullFileName;

	private String hisFullFileName;

	private WriteDataFileRunnable writeDataFileRunnable;

	private ReentrantLock writeSessionLock = new ReentrantLock();

	private volatile long lastModifiedTime;

	private static final int MAX_WRITE_BUFFER_SIZE = StoreConfig.getFileWriteBufferCacheSize();

	private final ByteBuffer writeBuffer = ByteBuffer.allocateDirect(MAX_WRITE_BUFFER_SIZE);

	private static final FlushDiskMode FLUSH_DISK_MODE = StoreConfig.getFlushDiskMode();

	private static final int MAX_WAIT_FOR_FLUSH_TIME_MILLS = 2 * 1000;

	private static final int MAX_WAIT_FOR_CLOSE_TIME_MILLS = 2 * 1000;

	private static final int INT_BYTE_SIZE = 4;

	/**
	 * Instantiates a new File transaction store manager.
	 * @param fullFileName the dir path
	 * @param sessionManager the session manager
	 * @throws IOException the io exception
	 */
	public FileTransactionStoreManager(String fullFileName, SessionManager sessionManager) throws IOException {
		initFile(fullFileName);
		fileWriteExecutor = new ThreadPoolExecutor(MAX_THREAD_WRITE, MAX_THREAD_WRITE, Integer.MAX_VALUE,
				TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
				new NamedThreadFactory("fileTransactionStore", MAX_THREAD_WRITE, true));
		writeDataFileRunnable = new WriteDataFileRunnable();
		fileWriteExecutor.submit(writeDataFileRunnable);
		this.sessionManager = sessionManager;
	}

	private void initFile(String fullFileName) throws IOException {
		this.currFullFileName = fullFileName;
		this.hisFullFileName = fullFileName + HIS_DATA_FILENAME_POSTFIX;
		try {
			currDataFile = new File(currFullFileName);
			if (!currDataFile.exists()) {
				// create parent dir first
				if (currDataFile.getParentFile() != null && !currDataFile.getParentFile().exists()) {
					currDataFile.getParentFile().mkdirs();
				}
				currDataFile.createNewFile();
				trxStartTimeMills = System.currentTimeMillis();
			}
			else {
				trxStartTimeMills = currDataFile.lastModified();
			}
			lastModifiedTime = System.currentTimeMillis();
			currRaf = new RandomAccessFile(currDataFile, "rw");
			currRaf.seek(currDataFile.length());
			currFileChannel = currRaf.getChannel();
		}
		catch (IOException exx) {
			LOGGER.error("init file error,{}", exx.getMessage(), exx);
			throw exx;
		}
	}

	@Override
	public boolean writeSession(LogOperation logOperation, SessionStorable session) {
		long curFileTrxNum;
		writeSessionLock.lock();
		try {
			if (!writeDataFile(new TransactionWriteStore(session, logOperation).encode())) {
				return false;
			}
			lastModifiedTime = System.currentTimeMillis();
			curFileTrxNum = FILE_TRX_NUM.incrementAndGet();
			if (curFileTrxNum % PER_FILE_BLOCK_SIZE == 0
					&& (System.currentTimeMillis() - trxStartTimeMills) > MAX_TRX_TIMEOUT_MILLS) {
				return saveHistory();
			}
		}
		catch (Exception exx) {
			LOGGER.error("writeSession error, {}", exx.getMessage(), exx);
			return false;
		}
		finally {
			writeSessionLock.unlock();
		}
		flushDisk(curFileTrxNum, currFileChannel);
		return true;
	}

	private void flushDisk(long curFileNum, FileChannel currFileChannel) {

		if (FLUSH_DISK_MODE == FlushDiskMode.SYNC_MODEL) {
			SyncFlushRequest syncFlushRequest = new SyncFlushRequest(curFileNum, currFileChannel);
			writeDataFileRunnable.putRequest(syncFlushRequest);
			syncFlushRequest.waitForFlush(MAX_WAIT_FOR_FLUSH_TIME_MILLS);
		}
		else {
			writeDataFileRunnable.putRequest(new AsyncFlushRequest(curFileNum, currFileChannel));
		}
	}

	/**
	 * get all overTimeSessionStorables merge write file
	 * @throws IOException
	 */
	private boolean saveHistory() throws IOException {
		boolean result;
		try {
			result = findTimeoutAndSave();
			CloseFileRequest request = new CloseFileRequest(currFileChannel, currRaf);
			writeDataFileRunnable.putRequest(request);
			request.waitForClose(MAX_WAIT_FOR_CLOSE_TIME_MILLS);
			Files.move(currDataFile.toPath(), new File(hisFullFileName).toPath(), StandardCopyOption.REPLACE_EXISTING);
		}
		catch (IOException exx) {
			LOGGER.error("save history data file error, {}", exx.getMessage(), exx);
			result = false;
		}
		finally {
			initFile(currFullFileName);
		}
		return result;
	}

	private boolean writeDataFrame(byte[] data) {
		if (data == null || data.length <= 0) {
			return true;
		}
		int dataLength = data.length;
		int bufferRemainingSize = writeBuffer.remaining();
		if (bufferRemainingSize <= INT_BYTE_SIZE) {
			if (!flushWriteBuffer(writeBuffer)) {
				return false;
			}
		}
		bufferRemainingSize = writeBuffer.remaining();
		if (bufferRemainingSize <= INT_BYTE_SIZE) {
			throw new IllegalStateException(
					String.format("Write buffer remaining size %d was too small", bufferRemainingSize));
		}
		writeBuffer.putInt(dataLength);
		bufferRemainingSize = writeBuffer.remaining();
		int dataPos = 0;
		while (dataPos < dataLength) {
			int dataLengthToWrite = dataLength - dataPos;
			dataLengthToWrite = Math.min(dataLengthToWrite, bufferRemainingSize);
			writeBuffer.put(data, dataPos, dataLengthToWrite);
			bufferRemainingSize = writeBuffer.remaining();
			if (bufferRemainingSize == 0) {
				if (!flushWriteBuffer(writeBuffer)) {
					return false;
				}
				bufferRemainingSize = writeBuffer.remaining();
			}
			dataPos += dataLengthToWrite;
		}
		return true;
	}

	private boolean flushWriteBuffer(ByteBuffer writeBuffer) {
		BufferUtils.flip(writeBuffer);
		if (!writeDataFileByBuffer(writeBuffer)) {
			return false;
		}
		BufferUtils.clear(writeBuffer);
		return true;
	}

	private boolean findTimeoutAndSave() throws IOException {
		List<GlobalSession> globalSessionsOverMaxTimeout = sessionManager
			.findGlobalSessions(new SessionCondition(MAX_TRX_TIMEOUT_MILLS));
		if (CollectionUtils.isEmpty(globalSessionsOverMaxTimeout)) {
			return true;
		}
		for (GlobalSession globalSession : globalSessionsOverMaxTimeout) {
			TransactionWriteStore globalWriteStore = new TransactionWriteStore(globalSession, LogOperation.GLOBAL_ADD);
			byte[] data = globalWriteStore.encode();
			if (!writeDataFrame(data)) {
				return false;
			}
			List<BranchSession> branchSessIonsOverMaXTimeout = globalSession.getSortedBranches();
			if (branchSessIonsOverMaXTimeout != null) {
				for (BranchSession branchSession : branchSessIonsOverMaXTimeout) {
					try {
						MDC.put(MDC_KEY_BRANCH_ID, String.valueOf(branchSession.getBranchId()));
						TransactionWriteStore branchWriteStore = new TransactionWriteStore(branchSession,
								LogOperation.BRANCH_ADD);
						data = branchWriteStore.encode();
						if (!writeDataFrame(data)) {
							return false;
						}
					}
					finally {
						MDC.remove(MDC_KEY_BRANCH_ID);
					}
				}
			}
		}
		if (flushWriteBuffer(writeBuffer)) {
			currFileChannel.force(false);
			return true;
		}
		return false;
	}

	@Override
	public GlobalSession readSession(String xid) {
		throw new StoreException("unsupport for read from file, xid:" + xid);
	}

	@Override
	public List<GlobalSession> readSession(SessionCondition sessionCondition) {
		throw new StoreException("unsupport for read from file");
	}

	@Override
	public void shutdown() {
		if (fileWriteExecutor != null) {
			fileWriteExecutor.shutdown();
			stopping = true;
			int retry = 0;
			while (!fileWriteExecutor.isTerminated() && retry < MAX_SHUTDOWN_RETRY) {
				++retry;
				try {
					Thread.sleep(SHUTDOWN_CHECK_INTERVAL);
				}
				catch (InterruptedException ignore) {
				}
			}
			if (retry >= MAX_SHUTDOWN_RETRY) {
				fileWriteExecutor.shutdownNow();
			}
		}
		try {
			if (currFileChannel.isOpen()) {
				currFileChannel.force(true);
			}
		}
		catch (IOException e) {
			LOGGER.error("fileChannel force error: {}", e.getMessage(), e);
		}
		closeFile(currRaf);
	}

	@Override
	public List<TransactionWriteStore> readWriteStore(int readSize, boolean isHistory) {
		File file = null;
		long currentOffset = 0;
		if (isHistory) {
			file = new File(hisFullFileName);
			currentOffset = recoverHisOffset;
		}
		else {
			file = new File(currFullFileName);
			currentOffset = recoverCurrOffset;
		}
		if (file.exists()) {
			return parseDataFile(file, readSize, currentOffset, isHistory);
		}
		return null;
	}

	@Override
	public boolean hasRemaining(boolean isHistory) {
		File file;
		RandomAccessFile raf = null;
		long currentOffset;
		if (isHistory) {
			file = new File(hisFullFileName);
			currentOffset = recoverHisOffset;
		}
		else {
			file = new File(currFullFileName);
			currentOffset = recoverCurrOffset;
		}
		try {
			raf = new RandomAccessFile(file, "r");
			return currentOffset < raf.length();

		}
		catch (IOException ignore) {
		}
		finally {
			closeFile(raf);
		}
		return false;
	}

	private List<TransactionWriteStore> parseDataFile(File file, int readSize, long currentOffset, boolean isHistory) {
		List<TransactionWriteStore> transactionWriteStores = new ArrayList<>(readSize);
		RandomAccessFile raf = null;
		FileChannel fileChannel = null;
		try {
			raf = new RandomAccessFile(file, "r");
			raf.seek(currentOffset);
			fileChannel = raf.getChannel();
			fileChannel.position(currentOffset);
			long size = raf.length();
			ByteBuffer buffSize = ByteBuffer.allocate(MARK_SIZE);
			while (fileChannel.position() < size) {
				try {
					BufferUtils.clear(buffSize);
					int avilReadSize = fileChannel.read(buffSize);
					if (avilReadSize != MARK_SIZE) {
						break;
					}
					BufferUtils.flip(buffSize);
					int bodySize = buffSize.getInt();
					byte[] byBody = new byte[bodySize];
					ByteBuffer buffBody = ByteBuffer.wrap(byBody);
					avilReadSize = fileChannel.read(buffBody);
					if (avilReadSize != bodySize) {
						break;
					}
					TransactionWriteStore writeStore = new TransactionWriteStore();
					writeStore.decode(byBody);
					transactionWriteStores.add(writeStore);
					if (transactionWriteStores.size() == readSize) {
						break;
					}
				}
				catch (Exception ex) {
					LOGGER.error("decode data file error:{}", ex.getMessage(), ex);
					break;
				}
			}
			return transactionWriteStores;
		}
		catch (IOException exx) {
			LOGGER.error("parse data file error:{},file:{}", exx.getMessage(), file.getName(), exx);
			return null;
		}
		finally {
			try {
				if (fileChannel != null) {
					if (isHistory) {
						recoverHisOffset = fileChannel.position();
					}
					else {
						recoverCurrOffset = fileChannel.position();
					}
				}
				closeFile(raf);
			}
			catch (IOException exx) {
				LOGGER.error("file close error{}", exx.getMessage(), exx);
			}
		}
	}

	private void closeFile(RandomAccessFile raf) {
		try {
			if (raf != null) {
				raf.close();
				raf = null;
			}
		}
		catch (IOException exx) {
			LOGGER.error("file close error,{}", exx.getMessage(), exx);
		}
	}

	private boolean writeDataFile(byte[] bs) {
		if (bs == null || bs.length >= Integer.MAX_VALUE - 3) {
			return false;
		}
		if (!writeDataFrame(bs)) {
			return false;
		}
		return flushWriteBuffer(writeBuffer);
	}

	private boolean writeDataFileByBuffer(ByteBuffer byteBuffer) {
		for (int retry = 0; retry < MAX_WRITE_RETRY; retry++) {
			try {
				while (byteBuffer.hasRemaining()) {
					currFileChannel.write(byteBuffer);
				}
				return true;
			}
			catch (Exception exx) {
				LOGGER.error("write data file error:{}", exx.getMessage(), exx);
			}
		}
		LOGGER.error("write dataFile failed,retry more than :{}", MAX_WRITE_RETRY);
		return false;
	}

	interface StoreRequest {

	}

	abstract static class AbstractFlushRequest implements StoreRequest {

		private final long curFileTrxNum;

		private final FileChannel curFileChannel;

		protected AbstractFlushRequest(long curFileTrxNum, FileChannel curFileChannel) {
			this.curFileTrxNum = curFileTrxNum;
			this.curFileChannel = curFileChannel;
		}

		public long getCurFileTrxNum() {
			return curFileTrxNum;
		}

		public FileChannel getCurFileChannel() {
			return curFileChannel;
		}

	}

	class SyncFlushRequest extends AbstractFlushRequest {

		private final CountDownLatch countDownLatch = new CountDownLatch(1);

		public SyncFlushRequest(long curFileTrxNum, FileChannel curFileChannel) {
			super(curFileTrxNum, curFileChannel);
		}

		public void wakeup() {
			this.countDownLatch.countDown();
		}

		public void waitForFlush(long timeout) {
			try {
				this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
			}
			catch (InterruptedException e) {
				LOGGER.error("Interrupted", e);
			}
		}

	}

	class AsyncFlushRequest extends AbstractFlushRequest {

		public AsyncFlushRequest(long curFileTrxNum, FileChannel curFileChannel) {
			super(curFileTrxNum, curFileChannel);
		}

	}

	static class CloseFileRequest implements StoreRequest {

		private final CountDownLatch countDownLatch = new CountDownLatch(1);

		private FileChannel fileChannel;

		private RandomAccessFile file;

		public CloseFileRequest(FileChannel fileChannel, RandomAccessFile file) {
			this.fileChannel = fileChannel;
			this.file = file;
		}

		public FileChannel getFileChannel() {
			return fileChannel;
		}

		public RandomAccessFile getFile() {
			return file;
		}

		public void wakeup() {
			this.countDownLatch.countDown();
		}

		public void waitForClose(long timeout) {
			try {
				this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
			}
			catch (InterruptedException e) {
				LOGGER.error("Interrupted", e);
			}
		}

	}

	/**
	 * The type Write data file runnable.
	 */
	class WriteDataFileRunnable implements Runnable {

		private LinkedBlockingQueue<StoreRequest> storeRequests = new LinkedBlockingQueue<>();

		public void putRequest(final StoreRequest request) {
			storeRequests.add(request);
		}

		@Override
		public void run() {
			while (!stopping) {
				try {
					StoreRequest storeRequest = storeRequests.poll(MAX_WAIT_TIME_MILLS, TimeUnit.MILLISECONDS);
					handleStoreRequest(storeRequest);
				}
				catch (Exception exx) {
					LOGGER.error("write file error: {}", exx.getMessage(), exx);
				}
			}
			handleRestRequest();
		}

		/**
		 * handle the rest requests when stopping is true
		 */
		private void handleRestRequest() {
			int remainNums = storeRequests.size();
			for (int i = 0; i < remainNums; i++) {
				handleStoreRequest(storeRequests.poll());
			}
		}

		private void handleStoreRequest(StoreRequest storeRequest) {
			if (storeRequest == null) {
				flushOnCondition(currFileChannel);
			}
			if (storeRequest instanceof SyncFlushRequest) {
				syncFlush((SyncFlushRequest) storeRequest);
			}
			else if (storeRequest instanceof AsyncFlushRequest) {
				async((AsyncFlushRequest) storeRequest);
			}
			else if (storeRequest instanceof CloseFileRequest) {
				closeAndFlush((CloseFileRequest) storeRequest);
			}
		}

		private void closeAndFlush(CloseFileRequest req) {
			long diff = FILE_TRX_NUM.get() - FILE_FLUSH_NUM.get();
			flush(req.getFileChannel());
			FILE_FLUSH_NUM.addAndGet(diff);
			closeFile(req.getFile());
			req.wakeup();
		}

		private void async(AsyncFlushRequest req) {
			flushOnCondition(req.getCurFileChannel());
		}

		private void syncFlush(SyncFlushRequest req) {
			if (req.getCurFileTrxNum() > FILE_FLUSH_NUM.get()) {
				long diff = FILE_TRX_NUM.get() - FILE_FLUSH_NUM.get();
				flush(req.getCurFileChannel());
				FILE_FLUSH_NUM.addAndGet(diff);
			}
			// notify
			req.wakeup();
		}

		private void flushOnCondition(FileChannel fileChannel) {
			if (FLUSH_DISK_MODE == FlushDiskMode.SYNC_MODEL) {
				return;
			}
			long diff = FILE_TRX_NUM.get() - FILE_FLUSH_NUM.get();
			if (diff == 0) {
				return;
			}
			if (diff % MAX_FLUSH_NUM == 0 || System.currentTimeMillis() - lastModifiedTime > MAX_FLUSH_TIME_MILLS) {
				flush(fileChannel);
				FILE_FLUSH_NUM.addAndGet(diff);
			}
		}

		private void flush(FileChannel fileChannel) {
			try {
				fileChannel.force(false);
			}
			catch (IOException exx) {
				LOGGER.error("flush error: {}", exx.getMessage(), exx);
			}
		}

	}

}
